package cn.doitedu.ml.bayes

import cn.doitedu.commons.util.SparkUtil
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

/**
 * @date: 2020/2/20
 * @site: www.doitedu.cn
 * @author: hunter.d 涛哥
 * @qq: 657270652
 * @description:
  *    明星出轨预测
  *    使用的算法：朴素贝叶斯
 */
object AmourBayesModuleTrainer {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName)

    // 加载原始样本数据
    val sample = spark.read.option("header","true").csv("userprofile/data/demo/bayes/sample")

    /**
      * 加工特征
      * name,job,income,age,sex,label
      * 张飞,老师,中,青年,男,出轨
      * 赵云,老师,中,中年,女,出轨
      * 陆小凤,老师,低,青年,男,没出
      *
      * mllib中的算法，需要输入的特征值是double类型
      */
    // 特征要数值化
    val sample2 = sample.selectExpr(
      "cast(case job when '老师' then 1.0 when '程序员' then 2.0 else 3.0 end as double) as job",
      "cast(case income when '低' then 1.0 when '中' then 2.0 else 3.0 end as double) as income",
      "cast(case age when '青年' then 1.0 when '中年' then 2.0 else 3.0 end as double) as age",
      "cast(case sex when '男' then 1.0 else 2.0 end as double) as sex",
      // sparkmllib中的朴素贝叶斯算法，默认预测结果中的类别是用0.0开始编号的
      "cast(case label when '出轨' then 0.0 else 1.0 end as double) as label"
    )

    /**
      * +---+------+---+---+-----+
      * |job|income|age|sex|label|
      * +---+------+---+---+-----+
      * |1.0|2.0   |1.0|1.0|0.0  |
      * |1.0|2.0   |2.0|2.0|0.0  |
      * |1.0|1.0   |1.0|1.0|1.0  |
      * |1.0|3.0   |3.0|2.0|0.0  |
      */

    // 要将所有特征，组合到一个字段(Vector类型)中
    val to_vec = (arr:scala.collection.mutable.WrappedArray[Double])=>{Vectors.dense(arr.toArray)}
    spark.udf.register("to_vec",to_vec)


    val sample3 = sample2.selectExpr("to_vec(array(job,income,age,sex)) as features","label")
    // sample3.printSchema()
    // sample3.show(30,false)

    /*
    // 向量组装器，它的作用是，对一个df中，多个vector类型的字段，组装成一个vector
    // 比如： df_x ： vec1    vec2
    val assembler = new VectorAssembler()
      .setInputCols(Array("vec1","vec2"))
      .setOutputCol("features")

    val df_y = assembler.transform(df_x)
    df_y.printSchema()        // schema :   feautres[包含原来vec1/vec2中的所有特征]
    df_y.show(100,false)*/

    // 构造sparkmllib中的算法对象
    val naiveBayes = new NaiveBayes()
        .setFeaturesCol("features")  // 设置样本数据中，特征数据所在的字段
        .setLabelCol("label")   // 设置样本数据中，类别标签所在字段
        .setSmoothing(1.0)  // 拉普拉斯平滑系数

    // 输入样本给算法，进行模型训练
    val model: NaiveBayesModel = naiveBayes.fit(sample3)


    // 将模型结果保存
    model.save("userprofile/data/demo/bayes/model")
    println(model)


    spark.close()
  }

}
